日志收集详解之logstash解析日志格式(一)

您所在的位置:网站首页 日志编码格式 plain 日志收集详解之logstash解析日志格式(一)

日志收集详解之logstash解析日志格式(一)

2024-06-06 18:15| 来源: 网络整理| 查看: 265

此系列文章一共分为三部分,分为 filebeat 部分,logstash 部分,es 部分。通过此系列的文章,可以快速了解整个日志收集的大概,本篇主要讲解logstash这一块

目录1. logstash 介绍2. logstash 工作原理2.1 输入端2.2 过滤器2.3 输出端3. logstash 容器化部署3.1 configmap 文件参考3.1.1 关于配置项需要做下简单说明3.1.1.1 INPUT3.1.1.2 OUTPUT3.2 deployment 文件参考4. logstash 的进阶使用4.1 需求介绍4.2 一步步的去解析日志4.2.1 首先进行日志格式化,取出我们想要的日志4.2.1 删除不必要的字段4.2.2 将所需日志进行 json 解析4.2.3 优化数组的结构4.2.4 转换数据类型5. 总结这篇文章只说了logstash的其中一种日志处理方式,用的是它自带的一些插件,基本上可以满足我们日常的一些需求,但是如果加入一些逻辑处理的话,我们也可以通过自定义ruby代码段来进行处理,下一篇文章将介绍结合ruby的日志处理。

1. logstash 介绍

版本:logstash-7.12.0

logstash就是用来处理数据的,通过建一个管道,将数据按照不同的阶段,进行处理,并最终输出的一个过程,以输入到elasticsearch为例,如下图:

logstash

basic logstash pipeline

2. logstash 工作原理

Logstash 事件处理管道有三个阶段:输入 → 过滤 → 输出。输入生成事件,过滤器修改事件,然后输出到其他地方。输入和输出支持编解码器,使您能够在数据进入或退出管道时对其进行编码或解码,而不必使用单独的过滤器。

参考官当文档:https://www.elastic.co/guide/en/logstash/current/pipeline.html#pipeline

2.1 输入端

input: 管道的输入端,可以将数据通过配置 input 输入到 logstash 的管道中,常用的输入插件有:

kafka redis file syslog beats 2.2 过滤器

过滤器是 Logstash 管道中的中间处理设备。您可以将筛选器与条件组合在一起,以便在事件满足特定条件时对其执行操作。一些有用的过滤器包括:

grok: 解析和构造任意文本。Grok 是目前 Logstash 中解析非结构化日志数据为结构化和可查询数据的最佳方式。Logstash 内置了 120 个模式,你很可能会找到一个满足你需要的模式! mutate: 对事件字段执行通用转换。您可以重命名、删除、替换和修改事件中的字段。 drop: 完全删除事件,例如 debug 事件。 clone: 创建事件的副本,可以添加或删除字段。 geoip: 添加关于 IP 地址的地理位置的信息。 json: 对 json 格式的数据进行处理。 json_encode: 转换成 json 格式的数据。 2.3 输出端

输出是 Logstash 管道的最后阶段。事件可以通过多个输出,但是一旦所有输出处理完成,事件就完成了它的执行。一些常用的输出包括:

elasticsearch: 发送事件数据到 elasticsearch file: 将事件数据写入磁盘文件。 3. logstash 容器化部署

容器化部署时直接将官方镜像拿过来,通过 k8s 的Deployment资源类型进行部署即可。 官方镜像地址:

https://www.elastic.co/guide/en/logstash/master/docker.html https://hub.docker.com/_/logstash 3.1 configmap 文件参考

下面的这个configmap中input通过配置项topics_pattern指定一个正则规则来灵活的去匹配一组 topic(当然也可以是用topics来指定具体的一组 topic), 然后这边没有使用filter做处理,直接输出到elasticsearch中。

全局配置文件

apiVersion: v1 data: logstash.yml: |- http.host: "0.0.0.0" pipeline.workers: 2 pipeline.batch.size: 250 pipeline.batch.delay: 50 xpack.management.enabled: false kind: ConfigMap metadata: name: logstash-config-global namespace: ops-logging

业务相关的配置文件

kind: ConfigMap apiVersion: v1 metadata: name: logstash-config-a namespace: ops-logging data: k8s.conf: |- input { kafka { bootstrap_servers => "10.127.91.90:9092,10.127.91.91:9092,10.127.91.92:9092" group_id => "k8s-hw-group" client_id => "k8s-hw-client" consumer_threads => 1 auto_offset_reset => latest topics_pattern => "k8s-hw.*" codec => "json" } } filter { } output { if [k8s][nameSpace] == "test" { elasticsearch { hosts => ["10.127.91.75:9200", "10.127.91.76:9200", "10.127.91.77:9200", "10.127.91.78:9200", "10.127.91.79:9200", "10.127.91.80:9200", "10.127.91.81:9200"] index => "k8s-%{[k8s][k8sName]}-%{[k8s][nameSpace]}-%{+YYYYMMddHH}" sniffing => "true" timeout => 10 } } else { elasticsearch { hosts => ["10.127.91.75:9200", "10.127.91.76:9200", "10.127.91.77:9200", "10.127.91.78:9200", "10.127.91.79:9200", "10.127.91.80:9200", "10.127.91.81:9200"] index => "k8s-%{[k8s][k8sName]}-%{[k8s][nameSpace]}-%{+YYYYMMdd}" sniffing => "true" timeout => 10 } } } 3.1.1 关于配置项需要做下简单说明 3.1.1.1 INPUT bootstrap_servers 指定 kafka 地址 topics 表示一组确定的 topic topics_pattern 表示通过自定义正则来模糊匹配一组 topic auto_offset_reset这个字段,表示 Kafka 中没有初始偏移量或偏移量超出范围时的策略,其中 earliest: 从头开始消费 latest: 从最新的 offset 开始消费 none: 如果没有找到消费者组的先前偏移量,则向消费者抛出异常 anything else: 直接向消费者抛出异常 consumer_threads 消费者端的线程数,理想情况下,您应该拥有与分区数量相同的线程,以达到完美的平衡——线程数量超过分区意味着有些线程将处于空闲状态,比如说我有 4 个 partition,假如我只启动一个副本,那么这里最好设置成 4,如果我启动了 4 个副本,那么这里设置成 1 3.1.1.2 OUTPUT

output 设置了一个判断,用来对来自 k8s 命名空间的 topic 进行区分,由于我的test命名空间中的日志量比较大,所以我在建索引时,按小时进行索引,所以这边单独设置了下,而其他命名空间走默认的配置项即可

具体可参考官方文档: https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html

3.2 deployment 文件参考 apiVersion: apps/v1 kind: Deployment metadata: labels: app: logstash-k8s name: logstash-k8s namespace: ops-logging spec: progressDeadlineSeconds: 600 replicas: 0 revisionHistoryLimit: 10 selector: matchLabels: app: logstash-k8s strategy: rollingUpdate: maxSurge: 25% maxUnavailable: 25% type: RollingUpdate template: metadata: labels: app: logstash-k8s spec: containers: - args: - /usr/share/logstash/bin/logstash -f /usr/share/logstash/conf/k8s.conf command: - /bin/sh - -c image: docker.elastic.co/logstash/logstash:7.12.0 imagePullPolicy: IfNotPresent name: logstash-k8s resources: limits: cpu: "4" memory: 4G requests: cpu: "4" memory: 4G terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /usr/share/logstash/conf name: config-volume - mountPath: /usr/share/logstash/config/logstash.yml name: logstash-config readOnly: true subPath: logstash.yml - args: - -c - /opt/bitnami/logstash-exporter/bin/logstash_exporter --logstash.endpoint='http://localhost:9600' command: - /bin/sh image: bitnami/logstash-exporter:latest imagePullPolicy: IfNotPresent name: logstash-exporter-k8s ports: - containerPort: 9198 name: lg-exporter protocol: TCP resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File dnsPolicy: ClusterFirst restartPolicy: Always schedulerName: default-scheduler securityContext: runAsUser: 0 terminationGracePeriodSeconds: 30 volumes: - configMap: defaultMode: 420 items: - key: k8s.conf path: k8s.conf name: logstash-config-sg-saas-pro-hbali name: config-volume - configMap: defaultMode: 420 name: logstash-config-global name: logstash-config

logstash-exporter 的 svc 参考

apiVersion: v1 kind: Service metadata: name: logstash-exporter-a namespace: ops-logging spec: ports: - name: http port: 9198 protocol: TCP targetPort: 9198 nodePort: 30003 selector: app: logstash sessionAffinity: None type: NodePort

上面的话应该算是logstash最简单的配置了,假如我们想调试的话,可以把下面这段改下

containers: - args: - /usr/share/logstash/bin/logstash -f /usr/share/logstash/conf/k8s.conf

改成

containers: - args: - sleep 1000000

这样我们在调试时,可直接进入到容器中调试。

4. logstash 的进阶使用 4.1 需求介绍

2021-08-01 12:26:04.063 INFO 24 --- [traceId=edda5daxxxxxxxxxcfa3387d48][ xnio-1 task-1] c.g.c.gateway.filter.AutoTestFilter : {"traceId":"edda5da8xxxxxxxxxxxxxxxxxxx387d48","headers":[{"x-forwarded-proto":"http,http","x-tenant-id":"123","x-ca-key":"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637","x-forwarded-port":"80,80","x-forwarded-for":"10.244.2.0","x-ca-client-ip":"10.244.2.0","x-product-code":"xxxxx","authorization":"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899","x-forwarded-host":"gatxxxxxxxxx.gm","x-forwarded-prefix":"/xxxxxx","trace-id":"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48","x-ca-api-id":"1418470181321347075","x-ca-env-code":"TEST"}],"appName":"超级管理员","responseTime":15,"serverName":"test-server","appkey":"a62d54b6bxxxxxxxxxxxxxxxxxxx37","time":"2021-08-01 12:26:04.062","responseStatus":200,"url":"/test/v4/orgs/123/list-children","token":"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899"}

上面是很常见的一条java程序的日志,我们首先想格式化此日志,然后取出里面的请求 body,也就是里面的一条json

{"traceId":"edda5da8xxxxxxxxxxxxxxxxxxx387d48","headers":[{"x-forwarded-proto":"http,http","x-tenant-id":"123","x-ca-key":"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637","x-forwarded-port":"80,80","x-forwarded-for":"10.244.2.0","x-ca-client-ip":"10.244.2.0","x-product-code":"xxxxx","authorization":"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899","x-forwarded-host":"gatxxxxxxxxx.gm","x-forwarded-prefix":"/xxxxxx","trace-id":"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48","x-ca-api-id":"1418470181321347075","x-ca-env-code":"TEST"}],"appName":"超级管理员","responseTime":15,"serverName":"test-server","appkey":"a62d54b6bxxxxxxxxxxxxxxxxxxx37","time":"2021-08-01 12:26:04.062","responseStatus":200,"url":"/test/v4/orgs/123/list-children","token":"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899"}

取出来之后,我们希望在 elasticsearch 里能根据指定的字段进行快速查询和聚合,因此需要对这段 json 进行重新解析,把里面的 k,v 都放到顶层,另外这段json里面还有一部分嵌套的数组,我们希望将数组中的 map 解析出来,并放到最外层中,最后将里面的一些字符串转换成整型的数据结构。

为了方便调试,这里重新启动了一个 pod,并指定一个了最简单的配置,将日志输出到控制台上,方便调试

apiVersion: apps/v1 kind: Deployment metadata: name: logstash-debug spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: app: logstash-debug strategy: rollingUpdate: maxSurge: 25% maxUnavailable: 25% type: RollingUpdate template: metadata: labels: app: logstash-debug spec: containers: - args: - sleep 1000000000000 command: - /bin/sh - -c image: docker.elastic.co/logstash/logstash:7.12.0 imagePullPolicy: IfNotPresent name: logstash-debug resources: limits: cpu: "4" memory: 4G requests: cpu: "4" memory: 4G terminationMessagePath: /dev/termination-log terminationMessagePolicy: File dnsPolicy: ClusterFirst restartPolicy: Always schedulerName: default-scheduler securityContext: runAsUser: 0 terminationGracePeriodSeconds: 30

pod 启动成功之后,我们直接指定配置文件

# debug.conf input { file { path => ["/var/log/test.log"] start_position => "beginning" sincedb_path => "/dev/null" } } filter { } output { stdout { codec => rubydebug } }

启动

logstash -f debug.conf

随后将上面的那条日志写道/var/log/test.log中

最终控制台输出结果

{ "host" => "logstash-debug-649dcb789c-n9866", "path" => "/var/log/test.log", "@timestamp" => 2021-08-01T06:46:43.292Z, "@version" => "1", "message" => "2021-08-01 12:26:04.063 INFO 24 --- [traceId=edda5daxxxxxxxxxcfa3387d48] [ XNIO-1 task-1] c.g.c.gateway.filter.AutoTestFilter : {\"traceId\":\"edda5da8xxxxxxxxxxxxxxxxxxx387d48\",\"headers\":[{\"x-forwarded-proto\":\"http,http\",\"x-tenant-id\":\"123\",\"x-ca-key\":\"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637\",\"x-forwarded-port\":\"80,80\",\"x-forwarded-for\":\"10.244.2.0\",\"x-ca-client-ip\":\"10.244.2.0\",\"x-product-code\":\"xxxxx\",\"authorization\":\"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899\",\"x-forwarded-host\":\"gatxxxxxxxxx.gm\",\"x-forwarded-prefix\":\"/xxxxxx\",\"trace-id\":\"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48\",\"x-ca-api-id\":\"1418470181321347075\",\"x-ca-env-code\":\"TEST\"}],\"appName\":\"超级管理员\",\"responseTime\":15,\"serverName\":\"test-server\",\"appkey\":\"a62d54b6bxxxxxxxxxxxxxxxxxxx37\",\"time\":\"2021-08-01 12:26:04.062\",\"responseStatus\":200,\"url\":\"/test/v4/orgs/123/list-children\",\"token\":\"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899\"}" } 4.2 一步步的去解析日志

使用 logstash 对原始日志进行日志格式化,这应该算是最常见的一种需求了,下面将通过filter中的grok来进行日志格式话,下面以上面的日志为例,我们来通过自定义日志格式,然后最终获取日志里面的一段 json 日志,也就是这一段{"traceId":"edda5da8xxxxxxxxxxxxxxxxxxx387d48","headers":[{"x-forwarded-proto":"http,http","x-tenant-id":"123","x-ca-key":"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637","x-forwarded-port":"80,80","x-forwarded-for":"10.244.2.0","x-ca-client-ip":"10.244.2.0","x-product-code":"xxxxx","authorization":"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899","x-forwarded-host":"gatxxxxxxxxx.gm","x-forwarded-prefix":"/xxxxxx","trace-id":"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48","x-ca-api-id":"1418470181321347075","x-ca-env-code":"TEST"}],"appName":"超级管理员","responseTime":15,"serverName":"test-server","appkey":"a62d54b6bxxxxxxxxxxxxxxxxxxx37","time":"2021-08-01 12:26:04.062","responseStatus":200,"url":"/test/v4/orgs/123/list-children","token":"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899"}

4.2.1 首先进行日志格式化,取出我们想要的日志

grok 官方参考文档: https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html grok 调试工具:https://grokdebug.herokuapp.com/

在上面的工具调试后,会将调试结果一并输出,如下图所示:

image-20210801160246344

下面是放到 logstash 中的配置段

filter { grok { match => {"message" => '%{TIMESTAMP_ISO8601:timeFlag} %{LOGLEVEL:logLevel} %{NUMBER:id} --- \[(?traceId=.*)\] \[ (?.*)\] (?[a-z0-9A-Z.]+) : (?{".*"}$)'} } }

这里格式化的就是message中的日志,通过一堆正则,然后来匹配出我们想要的关键日志,匹配结果如下:

{ "message" => "2021-08-01 12:26:04.063 INFO 24 --- [traceId=edda5daxxxxxxxxxcfa3387d48] [ XNIO-1 task-1] c.g.c.gateway.filter.AutoTestFilter : {\"traceId\":\"edda5da8xxxxxxxxxxxxxxxxxxx387d48\",\"headers\":[{\"x-forwarded-proto\":\"http,http\",\"x-tenant-id\":\"123\",\"x-ca-key\":\"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637\",\"x-forwarded-port\":\"80,80\",\"x-forwarded-for\":\"10.244.2.0\",\"x-ca-client-ip\":\"10.244.2.0\",\"x-product-code\":\"xxxxx\",\"authorization\":\"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899\",\"x-forwarded-host\":\"gatxxxxxxxxx.gm\",\"x-forwarded-prefix\":\"/xxxxxx\",\"trace-id\":\"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48\",\"x-ca-api-id\":\"1418470181321347075\",\"x-ca-env-code\":\"TEST\"}],\"appName\":\"超级管理员\",\"responseTime\":15,\"serverName\":\"test-server\",\"appkey\":\"a62d54b6bxxxxxxxxxxxxxxxxxxx37\",\"time\":\"2021-08-01 12:26:04.062\",\"responseStatus\":200,\"url\":\"/test/v4/orgs/123/list-children\",\"token\":\"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899\"}", "id" => "24", "Nio" => " XNIO-1 task-1", "@timestamp" => 2021-08-01T07:25:09.041Z, "filter" => "c.g.c.gateway.filter.AutoTestFilter", "traceId" => "traceId=edda5daxxxxxxxxxcfa3387d48", "timeFlag" => "2021-08-01 12:26:04.063", "path" => "/var/log/test.log", "originBody" => "{\"traceId\":\"edda5da8xxxxxxxxxxxxxxxxxxx387d48\",\"headers\":[{\"x-forwarded-proto\":\"http,http\",\"x-tenant-id\":\"123\",\"x-ca-key\":\"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637\",\"x-forwarded-port\":\"80,80\",\"x-forwarded-for\":\"10.244.2.0\",\"x-ca-client-ip\":\"10.244.2.0\",\"x-product-code\":\"xxxxx\",\"authorization\":\"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899\",\"x-forwarded-host\":\"gatxxxxxxxxx.gm\",\"x-forwarded-prefix\":\"/xxxxxx\",\"trace-id\":\"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48\",\"x-ca-api-id\":\"1418470181321347075\",\"x-ca-env-code\":\"TEST\"}],\"appName\":\"超级管理员\",\"responseTime\":15,\"serverName\":\"test-server\",\"appkey\":\"a62d54b6bxxxxxxxxxxxxxxxxxxx37\",\"time\":\"2021-08-01 12:26:04.062\",\"responseStatus\":200,\"url\":\"/test/v4/orgs/123/list-children\",\"token\":\"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899\"}", "@version" => "1", "host" => "logstash-debug-649dcb789c-n9866", "logLevel" => "INFO" } 4.2.1 删除不必要的字段

经过处理之后,我们可以看到新加了一个字段名叫做originBody,我们真正想要的就是这段,其他的字段都不需要,因此把没有用的字段删除, 这里用到了mutate中的remove_field来删除字段,关于该字段的具体使用可以参考其官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html#plugins-filters-mutate-remove_field

filter { grok { match => {"message" => '%{TIMESTAMP_ISO8601:timeFlag} %{LOGLEVEL:logLevel} %{NUMBER:id} --- \[(?traceId=.*)\] \[ (?.*)\] (?[a-z0-9A-Z.]+) : (?{".*"}$)'} } mutate { remove_field => ["message", "timeFlag", "logLevel", "id", "traceId", "Nio", "filter"] } }

经过此次处理后,会去掉message字段,结果如下所示:

{ "path" => "/var/log/test.log", "originBody" => "{\"traceId\":\"edda5da8xxxxxxxxxxxxxxxxxxx387d48\",\"headers\":[{\"x-forwarded-proto\":\"http,http\",\"x-tenant-id\":\"123\",\"x-ca-key\":\"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637\",\"x-forwarded-port\":\"80,80\",\"x-forwarded-for\":\"10.244.2.0\",\"x-ca-client-ip\":\"10.244.2.0\",\"x-product-code\":\"xxxxx\",\"authorization\":\"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899\",\"x-forwarded-host\":\"gatxxxxxxxxx.gm\",\"x-forwarded-prefix\":\"/xxxxxx\",\"trace-id\":\"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48\",\"x-ca-api-id\":\"1418470181321347075\",\"x-ca-env-code\":\"TEST\"}],\"appName\":\"超级管理员\",\"responseTime\":15,\"serverName\":\"test-server\",\"appkey\":\"a62d54b6bxxxxxxxxxxxxxxxxxxx37\",\"time\":\"2021-08-01 12:26:04.062\",\"responseStatus\":200,\"url\":\"/test/v4/orgs/123/list-children\",\"token\":\"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899\"}", "@version" => "1", "@timestamp" => 2021-08-01T07:30:17.548Z, "host" => "logstash-debug-649dcb789c-n9866", } 4.2.2 将所需日志进行 json 解析

然后我们想将originBody这个json中的字段放到顶层中,这里用到了filter中的json选项,用来解析json数据类型的日志,这里面有两个关键字段需要知道:

source: 指定要处理的 json 字段,这里对应的就是originBody target: 解析后的 json 数据存放位置,如果不指定将输出到顶层, 由于我这里就是要将解析好的数据放到顶层,因此不指定target filter { grok { match => {"message" => '%{TIMESTAMP_ISO8601:timeFlag} %{LOGLEVEL:logLevel} %{NUMBER:id} --- \[(?traceId=.*)\] \[ (?.*)\] (?[a-z0-9A-Z.]+) : (?{".*"}$)'} } json { source => "originBody" } mutate { remove_field => ["message", "timeFlag", "logLevel", "id", "traceId", "Nio", "filter", "originBody"] } }

处理结果如下

{ "@version" => "1", "serverName" => "test-server", "time" => "2021-08-01 12:26:04.062", "appkey" => "a62d54b6bxxxxxxxxxxxxxxxxxxx37", "responseStatus" => 200, "url" => "/test/v4/orgs/123/list-children", "headers" => [ [0] { "x-tenant-id" => "123", "x-ca-env-code" => "TEST", "x-ca-key" => "a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637", "authorization" => "bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899", "x-product-code" => "xxxxx", "x-ca-client-ip" => "10.244.2.0", "x-forwarded-host" => "gatxxxxxxxxx.gm", "x-forwarded-prefix" => "/xxxxxx", "x-forwarded-for" => "10.244.2.0", "x-ca-api-id" => "1418470181321347075", "x-forwarded-proto" => "http,http", "trace-id" => "edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48", "x-forwarded-port" => "80,80" } ], "host" => "logstash-debug-649dcb789c-n9866", "responseTime" => 15, "token" => "bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899", "appName" => "超级管理员", "path" => "/var/log/test.log", "@timestamp" => 2021-08-01T07:50:26.403Z } 4.2.3 优化数组的结构

基本上到这里我们想要的数据差不多都呈现出来了,但是可以看到headers这个是个数组,而里面的元素是一个map,我们需要将数组中的 map 给解析到外层,这里使用的是split这个选项,使用也很简单,具体可参考官方文档: https://www.elastic.co/guide/en/logstash/current/plugins-filters-split.html

filter { grok { match => {"message" => '%{TIMESTAMP_ISO8601:timeFlag} %{LOGLEVEL:logLevel} %{NUMBER:id} --- \[(?traceId=.*)\] \[ (?.*)\] (?[a-z0-9A-Z.]+) : (?{".*"}$)'} } json { source => "originBody" } split { field => "headers" } mutate { remove_field => ["message", "timeFlag", "logLevel", "id", "traceId", "Nio", "filter", "originBody"] } }

处理完之后,结果如下:

{ "appName" => "超级管理员", "serverName" => "test-server", "@version" => "1", "url" => "/test/v4/orgs/123/list-children", "time" => "2021-08-01 12:26:04.062", "token" => "bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899", "@timestamp" => 2021-08-01T07:55:01.353Z, "appkey" => "a62d54b6bxxxxxxxxxxxxxxxxxxx37", "path" => "/var/log/test.log", "responseTime" => 15, "responseStatus" => 200, "headers" => { "x-forwarded-proto" => "http,http", "x-product-code" => "xxxxx", "x-ca-client-ip" => "10.244.2.0", "authorization" => "bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899", "x-ca-key" => "a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637", "x-forwarded-for" => "10.244.2.0", "trace-id" => "edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48", "x-forwarded-host" => "gatxxxxxxxxx.gm", "x-forwarded-prefix" => "/xxxxxx", "x-forwarded-port" => "80,80", "x-tenant-id" => "123", "x-ca-env-code" => "TEST", "x-ca-api-id" => "1418470181321347075" }, "host" => "logstash-debug-649dcb789c-n9866" } 4.2.4 转换数据类型

嗯,已经满足了,接下来是最后一步,将某些字段的字符串转成整型

filter { grok { match => {"message" => '%{TIMESTAMP_ISO8601:timeFlag} %{LOGLEVEL:logLevel} %{NUMBER:id} --- \[(?traceId=.*)\] \[ (?.*)\] (?[a-z0-9A-Z.]+) : (?{".*"}$)'} } json { source => "originBody" } split { field => "headers" } mutate { remove_field => ["message", "timeFlag", "logLevel", "id", "traceId", "Nio", "filter", "originBody"] convert => { "responseStatus" => "integer" "responseTime" => "integer" } } }

最终结果

{ "appName" => "超级管理员", "token" => "bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899", "responseTime" => 15, "path" => "/var/log/test.log", "headers" => { "x-forwarded-host" => "gatxxxxxxxxx.gm", "trace-id" => "edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48", "x-ca-key" => "a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637", "x-forwarded-prefix" => "/xxxxxx", "x-ca-api-id" => "1418470181321347075", "x-ca-client-ip" => "10.244.2.0", "x-forwarded-for" => "10.244.2.0", "x-forwarded-port" => "80,80", "authorization" => "bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899", "x-ca-env-code" => "TEST", "x-forwarded-proto" => "http,http", "x-tenant-id" => "123", "x-product-code" => "xxxxx" }, "appkey" => "a62d54b6bxxxxxxxxxxxxxxxxxxx37", "time" => "2021-08-01 12:26:04.062", "@version" => "1", "responseStatus" => 200, "serverName" => "test-server", "url" => "/test/v4/orgs/123/list-children", "@timestamp" => 2021-08-01T07:57:54.071Z, "host" => "logstash-debug-649dcb789c-n9866" }

到这里就大功告成了

5. 总结 这篇文章只说了logstash的其中一种日志处理方式,用的是它自带的一些插件,基本上可以满足我们日常的一些需求,但是如果加入一些逻辑处理的话,我们也可以通过自定义ruby代码段来进行处理,下一篇文章将介绍结合ruby的日志处理。

欢迎各位朋友关注我的公众号,来一起学习进步哦 images



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3